package com.shujia.kafka

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}

object Demo3Consumer {
  def main(args: Array[String]): Unit = {


    /**
      * 创建消费者
      *
      */
    val properties = new Properties()

    //1、指定kafkabroker地址
    properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092")

    //2、指定kv的序列化类
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")



    //指定消费者组
    //同一条数据 在同一个消费者组内只处理一次
    properties.setProperty("group.id", "asdasdassssdssss")


    //是否自动提交消费偏移量
    properties.setProperty("enable.auto.commit", "false")

    //自动提交偏移量间隔时间
    //如果间隔时间太短会影响性能
    properties.put("auto.commit.interval.ms", "1000")


    /**
      * earliest
      * 当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，从头开始消费
      * latest
      * 当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，消费新产生的该分区下的数据
      * none
      * topic各分区都存在已提交的offset时，从offset后开始消费；只要有一个分区不存在已提交的offset，则抛出异常
      *
      */


    //从最早读取数据
    properties.put("auto.offset.reset", "earliest")


    //创建消费者
    val consumer = new KafkaConsumer[String, String](properties)


    val topics = new util.ArrayList[String]()
    topics.add("student")
    //订约topic
    consumer.subscribe(topics)


    while (true) {
      println("正在消费数据")

      //读取数据， 一次读取部分数据
      val records: ConsumerRecords[String, String] = consumer.poll(1000)


      //解析数据
      val lines: util.Iterator[ConsumerRecord[String, String]] = records.iterator()

      while (lines.hasNext) {
        //一行数据
        val record: ConsumerRecord[String, String] = lines.next()

        val key: String = record.key()
        val value: String = record.value()
        val offset: Long = record.offset()
        val partition: Int = record.partition()
        val ts: Long = record.timestamp()
        val topic: String = record.topic()

        println(key + "\t" + value + "\t" + offset + "\t" + partition + "\t" + ts + "\t" + topic)

      }

      //手动提交偏移量
      consumer.commitSync()
    }


  }
}
